---
layout: post
date: 2017-09-16
title: "Learning Elixir's Supervisors"
description: "An introduction to Elixir's Supervisors and Process Linking and Monitoring"
tags: [elixir, design]
---

<p>One of the things I struggled with when building our first elixir application was getting our Rabbit code to be properly supervised and resilient to failure. The pattern we generally follow is 1 connection and N workers, where each worker has a channel derived from the connection. The workers, which can be publishers (we normally only have 1) or consumers (we normally ave many) are dependent on the connection but independent of each other.</p>

<p>My first iteration worked, but wasn't the most elegant. I've since had a chance to revisit the code and thought it would make for good writing (and perhaps good reading).

<p>Given the dependencies described above, the first thing we can do is build a supervisor tree:</p>


{% highlight elixir %}defmodule App.Queue.Supervisor do
  use Supervisor

  def start_link() do
    Supervisor.start_link(__MODULE__, [], name: __MODULE__)
  end

  def init(_) do
    children = [
      worker(App.Queue.Connection, []),
      supervisor(App.Queue.WorkerSupervisor, [])
    ]

    supervise(children, strategy: :rest_for_one)
  end
end

defmodule App.Queue.WorkerSupervisor do
  use Supervisor

  def start_link() do
    Supervisor.start_link(__MODULE__, [], name: __MODULE__)
  end

  def init(_) do
    children = [
      worker(App.Queue.Publisher, []),
      worker(App.Queue.Consumers.Spice, []),
      worker(App.Queue.Consumers.Saiyans, []),
    ]
    supervise(children, strategy: :one_for_one)
  end
end{% endhighlight %}

<p>These are split up so that the correct <code>strategy</code> can be applied to each group. Starting from the bottom, if one worker is forced to restart, there's no reason to also restart the other workers. Thus, we pick the <code>one_for_one</code> strategy. The <code>rest_for_one</code> strategy that we use in our first Supervisor means that if one process fails and is restarted, all processes defined after it are also restarted. For this reason, we put the connection first. If the <code>App.Queue.Connection</code> process is restarted, then the <code>App.Queue.WorkerSupervisor</code> will also be restarted. But the reverse isn't true.</p>

<p>If we were to put all of the above in a single supervisor and kept the <code>rest_for_one</code> strategy, it would mean that the <code>Saiyans</code> consumer would be restarted whenever the <code>Spice</code> consumer was. This isn't what we want.</p>

<p>The above is a start, but now we need to dig into our actual processes. The first one that we'll look at, and also the most critical, is the <code>App.Queue.Connection</code> process. We'll be using this <a href="https://github.com/pma/amqp">AMQP</a> library. Our following examples will expose you to the parts of its API that you need to know with respect to this post.</p>

<p>A naïve implementation of our connection worker could look like:</p>

{% highlight elixir %}defmodule App.Queue.Connection do
  use GenServer

  def start_link() do
    GenServer.start_link(__MODULE__, [], name: __MODULE__)
  end

  def init(_) do
    # returns {:ok, conn} on success and {:error, err} on failure
    AMQP.Connection.open("amqp://....")
  end
end{% endhighlight %}

<p>The issue with this code is that if the connection fails, nothing happens until we try to use it. The supervisor is monitoring our process, not the connection. We could change the code to do:</p>

{% highlight elixir %}def init(_) do
  {:ok, conn} = AMQP.Connection.open("amqp://....")
  Process.link(conn.pid)
  {:ok, conn}
end{% endhighlight %}

<p>By linking our Connection process to the underlying connection, we ensure that when the underlying connection dies, our connection process will also die. At first glance, this might seem like the right approach. Our process will die, the supervisor will restart it and it'll restart each worker which can use the new connection to get new channels. However, it probably isn't what you want. 

<p>By default, if a process terminates 3 times in 5 seconds, the parent process, in our case <code>App.Queue.Supervisor</code>, terminates. This cascades up, so that the parent of <code>App.Queue.Supervisor</code> will try to restart that supervisor and itself terminates if we hit the 3 in 5 threshold. This goes all the way up to the application.</p>

<p>It's common for such connection failures, whether they're to a queue, database or webservice, to happen quickly. Failing 3 times in 5 seconds? We're likely to fail 1000 times in a second; taking down our entire app. Sadly, there's no backoff functionality in supervisors. In some cases, maybe you really do want to crash the entire app, but I think that for many apps, retrying indefinitely is a more desirable behaviour.</p>

<p>One quick solution could be to specify very high <code>max_restarts</code> and very low <code>max_seconds</code> parameters along with the <code>strategy</code>. But I'd like to explore something a little more elegant.</p>

<p>Rather than linking to the underlying connection process, we can <code>monitor/1</code> it. Rather than bubbling the termination we'll receive a message. Consider:</p>

{% highlight elixir %}def init(_) do
  {:ok, connect()}
end

defp connect() do
  case AMQP.Connection.open() do
    {:ok, conn} ->
      Process.monitor(conn.pid)
      conn  # this will be our state
    {:error, err} ->
      Logger.error("failed to connect: #{inspect err}")
      :timer.sleep(1000)
      connect() # keep trying
  end
end

# we get this message because of the call to monitor we did
def handle_info({:DOWN, _, :process, _pid, reason}, _) do
  Logger.error("queue connection is down: #{inspect reason}")
  {:noreply, connect()}
end{% endhighlight %}

<p>Rather than terminating, we handle the <code>:DOWN</code> notification and try to reconnect, waiting 1 second between attempts. Essentially, we're taking over the opinionated Supervisor's restart strategy; trying infinitely and applying whatever backoff algorithm we want.</p>

<p>We could apply the same strategy to our workers, having each monitor their channel's process. Alternative, I want to show why, in this case, it's OK to link rather than monitor:</p>

{% highlight elixir %}defmodule App.Queue.Publisher do
  use GenServer

  def start_link() do
    GenServer.start_link(__MODULE__, [], name: __MODULE__)
  end

  def init(_) do
    {:ok, channel} = App.Queue.Connection.open_channel()
    Process.link(channel.pid)
    {:ok, channel}
  end
end{% endhighlight %}

<p>We'll look at <code>open_channel</code> briefly, but as you read it, I want you to try to figure out why the above is safe. If Rabbit goes down, why won't this fail 3 times in 5 seconds?</p>

{% highlight elixir %}defmodule App.Queue.Connection do
  ...

  def open_channel() do
    GenServer.call(__MODULE__, :open_channel)
  end

  def handle_call(:open_channel, _from, conn) do
    {:reply, AMQP.Channel.open(conn), conn}
  end
  
  ...
end{% endhighlight %}

<p>Do you see it? It's a little tricky to follow, so let's walk through it. Let's assume the app is running when suddenly our connection to Rabbit dies. This causes our channel to terminate (in Rabbit, a channel dies when its connection dies) thus causing our Publisher to terminate (since it was linked to the channel process). The <code>WorkerSupervisor</code> detects this and restarts the Publisher. As part of its startup process, in its <code>init/1</code> function, the process asks the Connection for a channel. This is a blocking call and one of two things will happen. Either a channel will be returned, or the call will timeout (the default 5 seconds, since we didn't specify one when we called <code>GenServer.call</code>).</p>

<p>Why will it timeout? Because so long as our link to Rabbit is down, the Connection process is going to be in its <code>connect</code> loop. If the connection isn't established, pending calls will timeout. If the connection is re-established, a channel is returned. With a timeout of 5 seconds, our workers will only fail once per 5 seconds. Thus avoiding the supervisor's limits.</p>

<p>Hopefully this not only provides some practical code for keeping Rabbit connections and channels alive, but also illustrates how supervisors, process linking and process monitoring works and can be leveraged.</p>
